{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "optimum-baptist",
   "metadata": {},
   "source": [
    "# STSI demo using Sentinel-2 and ICESat-2 data\n",
    "# STSI used temporal slicing index\n",
    "## This demo could be used to find Spatio-temporal intersection between the two datasets and the potential area of interest"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "adapted-employee",
   "metadata": {},
   "source": [
    "## Input date range for query, we have dataset collected in 2020 in this demo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "absent-franchise",
   "metadata": {},
   "outputs": [],
   "source": [
    "### test temporal index\n",
    "## the basic idea is to generate a temporal id, and classify data into categories according to time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "sealed-monte",
   "metadata": {},
   "outputs": [],
   "source": [
    "# date in the format of \"yyy-MM-dd\", results include both start date and end date\n",
    "\n",
    "# The hour delay variable represents how many hours apart the two datasets are considered to be temporally intersected\n",
    "# E.g. if hour_delay = 6, for 2020-06-26 12:00:00, timestamps between 2020-06-26 06:00:00 and 2020-06-26 18:00:00 are considered to be temporally intersected\n",
    "start_date = '2020-01-01'\n",
    "end_date = '2020-06-30'\n",
    "duration_month = 6 # for export\n",
    "time_index_hour_length = 72\n",
    "hour_delay = 6"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "descending-filename",
   "metadata": {},
   "source": [
    "## please input the potential area of interest, we have \"Beaufort_Sea\" and \"Wandel_Sea\" in in this demo\n",
    "## if not using AOIs, could leave blank"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "88d44b8c-d513-4376-843f-d1578f39de47",
   "metadata": {},
   "source": [
    "## please input the potential area of interest, we have \"Beaufort_Sea\" and \"Wandel_Sea\" in in this demo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "above-runner",
   "metadata": {},
   "outputs": [],
   "source": [
    "Paoi = 'Wandel_Sea'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "prospective-whale",
   "metadata": {},
   "source": [
    "## First, setup the Spark and Sedona environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "willing-forge",
   "metadata": {},
   "outputs": [],
   "source": [
    "import findspark\n",
    "#findspark.init() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "threatened-welding",
   "metadata": {},
   "outputs": [],
   "source": [
    "SPARK_HOME='/opt/cloudera/parcels/CDH/lib/spark'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "athletic-sally",
   "metadata": {},
   "outputs": [],
   "source": [
    "findspark.init(SPARK_HOME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "mediterranean-nylon",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/zhangp/.conda/envs/py37_environment/lib/python3.7/site-packages/geopandas/_compat.py:58: UserWarning: The installed version of PyGEOS is too old (0.6 installed, 0.8 required), and thus GeoPandas will not use PyGEOS.\n",
      "  UserWarning,\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "import os\n",
    "import codecs\n",
    "import subprocess\n",
    "#from hdfs import InsecureClient\n",
    "import numpy as np\n",
    "#from pyspark import SparkContext\n",
    "from pyspark import SQLContext\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql import functions as F\n",
    "from pyspark.sql.types import *\n",
    "import rtree\n",
    "from pyspark.sql import Window\n",
    "#import igraph\n",
    "#from igraph import Graph\n",
    "import geofeather"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "annoying-dutch",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark import StorageLevel\n",
    "import geopandas as gpd\n",
    "import pandas as pd\n",
    "from pyspark.sql.types import StructType\n",
    "from pyspark.sql.types import StructField\n",
    "from pyspark.sql.types import StringType\n",
    "from pyspark.sql.types import LongType\n",
    "from shapely.geometry import Point\n",
    "from shapely.geometry import Polygon\n",
    "\n",
    "from sedona.register import SedonaRegistrator\n",
    "from sedona.core.SpatialRDD import SpatialRDD\n",
    "from sedona.core.SpatialRDD import PointRDD\n",
    "from sedona.core.SpatialRDD import PolygonRDD\n",
    "from sedona.core.SpatialRDD import LineStringRDD\n",
    "from sedona.core.enums import FileDataSplitter\n",
    "from sedona.utils.adapter import Adapter\n",
    "from sedona.core.spatialOperator import KNNQuery\n",
    "from sedona.core.spatialOperator import JoinQuery\n",
    "from sedona.core.spatialOperator import JoinQueryRaw\n",
    "from sedona.core.spatialOperator import RangeQuery\n",
    "from sedona.core.spatialOperator import RangeQueryRaw\n",
    "from sedona.core.formatMapper.shapefileParser import ShapefileReader\n",
    "from sedona.core.formatMapper import WkbReader\n",
    "from sedona.core.formatMapper import WktReader\n",
    "from sedona.core.formatMapper import GeoJsonReader\n",
    "from sedona.sql.types import GeometryType\n",
    "from sedona.core.enums import GridType\n",
    "from sedona.core.SpatialRDD import RectangleRDD\n",
    "from sedona.core.enums import IndexType\n",
    "from sedona.core.geom.envelope import Envelope\n",
    "from sedona.utils import SedonaKryoRegistrator, KryoSerializer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "broken-kruger",
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ['PYSPARK_PYTHON'] = \"./environment/bin/python\"\n",
    "os.environ['YARN_CONF_DIR'] = \"/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "directed-alexander",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession \\\n",
    ".builder \\\n",
    ".appName(\"test_test_1\") \\\n",
    ".master('yarn') \\\n",
    ".config(\"spark.serializer\", KryoSerializer.getName) \\\n",
    ".config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName) \\\n",
    ".config('spark.jars','sedona-core-2.4_2.11-1.0.0-incubating.jar,sedona-sql-2.4_2.11-1.0.0-incubating.jar,sedona-python-adapter-2.4_2.11-1.0.0-incubating.jar,sedona-viz-2.4_2.11-1.0.0-incubating.jar,geotools-wrapper-geotools-24.0.jar') \\\n",
    ".config('spark.executor.memory', '20g') \\\n",
    ".config('spark.driver.memory', '10g') \\\n",
    ".config('spark.sql.shuffle.partitions', 6144) \\\n",
    ".config('spark.executor.instances', '24') \\\n",
    ".config('spark.executor.cores', '5') \\\n",
    ".config('spark.rpc.message.maxSize', '1024') \\\n",
    ".config('spark.yarn.dist.archives', 'environment.tar.gz#environment') \\\n",
    ".getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "talented-terrace",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "SedonaRegistrator.registerAll(spark)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "individual-italy",
   "metadata": {},
   "source": [
    "## Second, read datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "closed-updating",
   "metadata": {},
   "source": [
    "### Read ICESat-2 data, Spark have different types of input methods, here we read from Hadoop file system, as our Spark 2.4 is built based on Hadoop Yarn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "narrow-foundation",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw = spark.read.option(\"header\",True).option('inferSchema', True).csv(\"ICE_Sat_hour_cycle_orbits_split\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "engaging-collapse",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "79090"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "is2_df_raw.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "governing-trinidad",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<function __main__.orbit_date(s)>"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# extract collecting time of the orbit from filename, only at level of day for temporal granularity for now\n",
    "from datetime import datetime\n",
    "def orbit_date(s):\n",
    "    length = len(s)\n",
    "    date_str = s[(length-19):(length-8)]\n",
    "    date_obj = datetime.strptime(date_str, '%d-%b-%Y')\n",
    "#   return date_obj.strftime(\"%Y-%m-%d\")\n",
    "    return date_obj.strftime(\"%Y-%m-%d\")\n",
    "\n",
    "# Add date UDF\n",
    "spark.udf.register(\"orbit_date\", orbit_date)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "forty-assistant",
   "metadata": {},
   "outputs": [],
   "source": [
    "# is2_df_raw.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "unauthorized-operation",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw = is2_df_raw.withColumn('date_date_type', (F.unix_timestamp(\"date_day\") + F.col(\"day_hour\") * 3600).cast('timestamp'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "blocked-horizontal",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "|           date_day|      split_id|day_key|day_cycle|     day_exact_time|day_hour|         Description|     formed_line_WKT|     date_date_type|\n",
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "|2019-12-23 00:00:00|            -1|   1330|        5|2019-12-23 00:00:08|       0|RGT 1330 23-Dec-2...|LINESTRING (106.7...|2019-12-23 00:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1330|        5|2019-12-23 01:00:08|       1|RGT 1330 23-Dec-2...|LINESTRING (-92.1...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|45603962748928|   1331|        5|2019-12-23 01:04:25|       1|RGT 1331 23-Dec-2...|LINESTRING (-93.8...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1331|        5|2019-12-23 01:28:25|       1|RGT 1331 23-Dec-2...|LINESTRING (127.8...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1331|        5|2019-12-23 02:00:25|       2|RGT 1331 23-Dec-2...|LINESTRING (70.78...|2019-12-23 02:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 02:38:43|       2|RGT 1332 23-Dec-2...|LINESTRING (-117....|2019-12-23 02:00:00|\n",
      "|2019-12-23 00:00:00|30537217474562|   1332|        5|2019-12-23 03:00:43|       3|RGT 1332 23-Dec-2...|LINESTRING (-141....|2019-12-23 03:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 03:02:43|       3|RGT 1332 23-Dec-2...|LINESTRING (103.6...|2019-12-23 03:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 04:00:43|       4|RGT 1332 23-Dec-2...|LINESTRING (-135....|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|12360915877888|   1333|        5|2019-12-23 04:13:00|       4|RGT 1333 23-Dec-2...|LINESTRING (-141....|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1333|        5|2019-12-23 04:36:00|       4|RGT 1333 23-Dec-2...|LINESTRING (168.0...|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1333|        5|2019-12-23 05:00:00|       5|RGT 1333 23-Dec-2...|LINESTRING (27.29...|2019-12-23 05:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 05:47:17|       5|RGT 1334 23-Dec-2...|LINESTRING (-164....|2019-12-23 05:00:00|\n",
      "|2019-12-23 00:00:00|34024730918912|   1334|        5|2019-12-23 06:00:17|       6|RGT 1334 23-Dec-2...|LINESTRING (-170....|2019-12-23 06:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 06:08:17|       6|RGT 1334 23-Dec-2...|LINESTRING (178.4...|2019-12-23 06:00:00|\n",
      "|2019-12-23 00:00:00|17961553231872|   1334|        5|2019-12-23 07:00:17|       7|RGT 1334 23-Dec-2...|LINESTRING (-170....|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 07:05:17|       7|RGT 1334 23-Dec-2...|LINESTRING (179.5...|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1335|        5|2019-12-23 07:21:35|       7|RGT 1335 23-Dec-2...|LINESTRING (171.5...|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|42245298323457|   1335|        5|2019-12-23 08:00:35|       8|RGT 1335 23-Dec-2...|LINESTRING (-16.8...|2019-12-23 08:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1335|        5|2019-12-23 08:33:35|       8|RGT 1335 23-Dec-2...|LINESTRING (175.0...|2019-12-23 08:00:00|\n",
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df_raw.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "sharp-stand",
   "metadata": {},
   "outputs": [],
   "source": [
    "# filter date range\n",
    "is2_df_raw = is2_df_raw.filter(F.col('date_date_type') <= F.lit(end_date)).filter(F.col('date_date_type') >= F.lit(start_date))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "electric-right",
   "metadata": {},
   "outputs": [],
   "source": [
    "# print(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "celtic-beijing",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "9929"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "is2_df_raw.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "fifteen-battle",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw.createOrReplaceTempView(\"is2_df_raw\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "passive-circular",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Extract Spatial information from the WKT column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "potential-breeding",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|               orbit|day_key|day_cycle|     date_date_type|     day_exact_time|day_hour|         Description|is_timestamp|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|LINESTRING (-124....|    935|        6|2020-02-26 00:00:00|2020-02-26 00:00:15|       0|RGT 935 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (-124....|    936|        6|2020-02-26 00:00:00|2020-02-26 00:00:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (97.63...|    936|        6|2020-02-26 00:00:00|2020-02-26 00:24:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (38.44...|    936|        6|2020-02-26 01:00:00|2020-02-26 01:00:32|       1|RGT 936 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (-147....|    937|        6|2020-02-26 01:00:00|2020-02-26 01:34:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (161.0...|    937|        6|2020-02-26 01:00:00|2020-02-26 01:57:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (37.46...|    937|        6|2020-02-26 02:00:00|2020-02-26 02:00:50|       2|RGT 937 26-Feb-20...|  1582700400|\n",
      "|LINESTRING (-168....|    937|        6|2020-02-26 03:00:00|2020-02-26 03:00:50|       3|RGT 937 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (-171....|    938|        6|2020-02-26 03:00:00|2020-02-26 03:09:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (178.8...|    938|        6|2020-02-26 03:00:00|2020-02-26 03:27:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# generate Linestring using ST_GeomFromWKT() only keep certain columns\n",
    "\n",
    "is2_df = spark.sql(\"select ST_GeomFromWKT(formed_line_WKT) as orbit, day_key, day_cycle, date_date_type, day_exact_time, day_hour, Description from is2_df_raw\")\n",
    "\n",
    "is2_df = is2_df.withColumn('is_timestamp', F.unix_timestamp(F.col('date_date_type')))\n",
    "is2_df.createOrReplaceTempView(\"is2_df\")\n",
    "\n",
    "## Show the schema of the table\n",
    "spark.table(\"is2_df\").show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "protective-uzbekistan",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Extract data within potential time slots\n",
    "import time\n",
    "import datetime\n",
    "\n",
    "start_timestamp = time.mktime(datetime.datetime.strptime(start_date, \"%Y-%m-%d\").timetuple())\n",
    "start_timestamp = start_timestamp - hour_delay * 3600 # for potential time delay\n",
    "\n",
    "end_timestamp = time.mktime(datetime.datetime.strptime(end_date, \"%Y-%m-%d\").timetuple())\n",
    "end_timestamp = end_timestamp + hour_delay * 3600 # for potential time delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "ceramic-cutting",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|               orbit|day_key|day_cycle|     date_date_type|     day_exact_time|day_hour|         Description|is_timestamp|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|LINESTRING (-124....|    935|        6|2020-02-26 00:00:00|2020-02-26 00:00:15|       0|RGT 935 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (-124....|    936|        6|2020-02-26 00:00:00|2020-02-26 00:00:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (97.63...|    936|        6|2020-02-26 00:00:00|2020-02-26 00:24:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (38.44...|    936|        6|2020-02-26 01:00:00|2020-02-26 01:00:32|       1|RGT 936 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (-147....|    937|        6|2020-02-26 01:00:00|2020-02-26 01:34:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (161.0...|    937|        6|2020-02-26 01:00:00|2020-02-26 01:57:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (37.46...|    937|        6|2020-02-26 02:00:00|2020-02-26 02:00:50|       2|RGT 937 26-Feb-20...|  1582700400|\n",
      "|LINESTRING (-168....|    937|        6|2020-02-26 03:00:00|2020-02-26 03:00:50|       3|RGT 937 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (-171....|    938|        6|2020-02-26 03:00:00|2020-02-26 03:09:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (178.8...|    938|        6|2020-02-26 03:00:00|2020-02-26 03:27:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df = is2_df.filter(F.col('is_timestamp') <= end_timestamp).filter(F.col('is_timestamp') >= start_timestamp)\n",
    "is2_df.createOrReplaceTempView(\"is2_df\")\n",
    "\n",
    "## Show the schema of the table\n",
    "spark.table(\"is2_df\").show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "severe-formula",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "interested-antique",
   "metadata": {},
   "source": [
    "### Read sentinel data for spatial join\n",
    "the file contains Sentinel-2 metadata that collect for one year in 2020, contains 2052387 records in total\n",
    "the format of meta_s2_2020_wkt.csv is in CSV with | sep, and footprint is in WTS format\n",
    "the dataset is also read from Hadoop file system\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "criminal-platform",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "sentinel_df = spark.read.option(\"header\",True).options(delimiter='|').csv(\"meta_s2_2020_wkt.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "elect-warren",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "mysterious-finish",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "private-nirvana",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+--------------------+\n",
      "|_c0|            s2_index|           timestamp|             wkt_geo|\n",
      "+---+--------------------+--------------------+--------------------+\n",
      "|  0|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((166.768...|\n",
      "|  1|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.185...|\n",
      "|  2|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.144...|\n",
      "|  3|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.663...|\n",
      "|  4|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((167.573...|\n",
      "|  5|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.518...|\n",
      "|  6|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.588...|\n",
      "|  7|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.555...|\n",
      "|  8|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.641...|\n",
      "|  9|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.471...|\n",
      "| 10|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.437...|\n",
      "| 11|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((170.575...|\n",
      "| 12|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.494...|\n",
      "| 13|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((171.457...|\n",
      "| 14|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((171.989...|\n",
      "| 15|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((170.999...|\n",
      "| 16|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.559...|\n",
      "| 17|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.524...|\n",
      "| 18|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.494...|\n",
      "| 19|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.425...|\n",
      "+---+--------------------+--------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(20)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "metallic-three",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_df = sentinel_df.withColumn('date_date_type', F.to_timestamp(F.udf(lambda x: x[:10] + ' ' + x[11:19])(F.col('timestamp')), 'yyyy-MM-dd HH:mm:ss'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "concerned-expression",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "|_c0|            s2_index|           timestamp|             wkt_geo|     date_date_type|\n",
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "|  0|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((166.768...|2020-01-01 00:01:29|\n",
      "|  1|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.185...|2020-01-01 00:01:20|\n",
      "|  2|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.144...|2020-01-01 00:01:05|\n",
      "|  3|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.663...|2020-01-01 00:01:27|\n",
      "|  4|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((167.573...|2020-01-01 00:00:51|\n",
      "|  5|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.518...|2020-01-01 00:00:38|\n",
      "|  6|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.588...|2020-01-01 00:01:17|\n",
      "|  7|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.555...|2020-01-01 00:01:02|\n",
      "|  8|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.641...|2020-01-01 00:01:27|\n",
      "|  9|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.471...|2020-01-01 00:01:12|\n",
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "incomplete-fossil",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "id": "constitutional-being",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "id": "sticky-recruitment",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2052387"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "id": "everyday-helmet",
   "metadata": {},
   "outputs": [],
   "source": [
    "# extract spatial information using WKT format footprint\n",
    "sentinel_df.createOrReplaceTempView(\"sentinel_df\")\n",
    "sentinel_sedona = spark.sql(\"select ST_GeomFromWKT(sentinel_df.wkt_geo) as geometry, sentinel_df.s2_index as s2_index, sentinel_df.date_date_type as date_date_type from sentinel_df\")\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "sentinel_sedona = sentinel_sedona.withColumnRenamed('date_date_type', 'Sentinel_date_date_type')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "id": "determined-genius",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Extract data within potential time slots\n",
    "\n",
    "sentinel_sedona = sentinel_sedona.filter(F.unix_timestamp(\"Sentinel_date_date_type\") <= end_timestamp).filter(F.unix_timestamp(\"Sentinel_date_date_type\") >= start_timestamp)\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "\n",
    "sentinel_sedona.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "id": "continent-annual",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2042659"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_sedona.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "id": "mineral-addiction",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|\n",
      "+--------------------+--------------------+-----------------------+----+----------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|   0|2020-01-01|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|   0|2020-01-01|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|   0|2020-01-01|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|   0|2020-01-01|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|   0|2020-01-01|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|   0|2020-01-01|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|   0|2020-01-01|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|   0|2020-01-01|\n",
      "+--------------------+--------------------+-----------------------+----+----------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona = sentinel_sedona.withColumn('hour', F.hour(F.col('Sentinel_date_date_type')))\n",
    "sentinel_sedona = sentinel_sedona.withColumn('date', F.to_date(F.col('Sentinel_date_date_type')))\n",
    "\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "\n",
    "sentinel_sedona.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "id": "complete-armor",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1577854800.0"
      ]
     },
     "execution_count": 37,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# generate temporal slicing index\n",
    "\n",
    "# find start timestamp\n",
    "import datetime\n",
    "import time\n",
    " \n",
    "# assigned regular string date\n",
    "start_date_datetype = datetime.datetime(2020, 1, 1, 0, 0)\n",
    "# print(\"date_time =>\",date_time)\n",
    " \n",
    "start_date_timestamp = time.mktime(start_date_datetype.timetuple())\n",
    "start_date_timestamp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "legendary-august",
   "metadata": {},
   "outputs": [],
   "source": [
    "# get time_duration and hour_delay into seconds for comparison\n",
    "time_index_hour_length_second = time_index_hour_length * 3600\n",
    "hour_delay_second = hour_delay * 3600"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "id": "touched-horizon",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "259200"
      ]
     },
     "execution_count": 39,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "time_index_hour_length_second"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "id": "suspended-despite",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_time_index = sentinel_sedona.withColumn('time_index', F.floor((F.unix_timestamp('Sentinel_date_date_type') - F.lit(start_date_timestamp)) / F.lit(time_index_hour_length_second)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "id": "invalid-forty",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|time_index|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|   0|2020-01-01|         0|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.437...|20191231T235959_2...|    2020-01-01 00:00:58|   0|2020-01-01|         0|\n",
      "|POLYGON ((170.575...|20191231T235959_2...|    2020-01-01 00:01:30|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.494...|20191231T235959_2...|    2020-01-01 00:01:25|   0|2020-01-01|         0|\n",
      "|POLYGON ((171.457...|20191231T235959_2...|    2020-01-01 00:01:07|   0|2020-01-01|         0|\n",
      "|POLYGON ((171.989...|20191231T235959_2...|    2020-01-01 00:00:53|   0|2020-01-01|         0|\n",
      "|POLYGON ((170.999...|20191231T235959_2...|    2020-01-01 00:01:19|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.559...|20191231T235959_2...|    2020-01-01 00:00:48|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.524...|20191231T235959_2...|    2020-01-01 00:00:34|   0|2020-01-01|         0|\n",
      "|POLYGON ((168.494...|20191231T235959_2...|    2020-01-01 00:00:19|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.425...|20191231T235959_2...|    2020-01-01 00:00:08|   0|2020-01-01|         0|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona_time_index.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "id": "three-carpet",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_time_index_early = sentinel_sedona_time_index.filter(((F.unix_timestamp('Sentinel_date_date_type') - F.lit(start_date_timestamp)) % F.lit(time_index_hour_length_second)) <= hour_delay_second)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "id": "interracial-miracle",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|time_index|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|   0|2020-01-01|         0|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.437...|20191231T235959_2...|    2020-01-01 00:00:58|   0|2020-01-01|         0|\n",
      "|POLYGON ((170.575...|20191231T235959_2...|    2020-01-01 00:01:30|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.494...|20191231T235959_2...|    2020-01-01 00:01:25|   0|2020-01-01|         0|\n",
      "|POLYGON ((171.457...|20191231T235959_2...|    2020-01-01 00:01:07|   0|2020-01-01|         0|\n",
      "|POLYGON ((171.989...|20191231T235959_2...|    2020-01-01 00:00:53|   0|2020-01-01|         0|\n",
      "|POLYGON ((170.999...|20191231T235959_2...|    2020-01-01 00:01:19|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.559...|20191231T235959_2...|    2020-01-01 00:00:48|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.524...|20191231T235959_2...|    2020-01-01 00:00:34|   0|2020-01-01|         0|\n",
      "|POLYGON ((168.494...|20191231T235959_2...|    2020-01-01 00:00:19|   0|2020-01-01|         0|\n",
      "|POLYGON ((169.425...|20191231T235959_2...|    2020-01-01 00:00:08|   0|2020-01-01|         0|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona_time_index_early.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "established-plain",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "id": "nuclear-audience",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_time_index_late = sentinel_sedona_time_index.filter(((F.unix_timestamp('Sentinel_date_date_type') - F.lit(start_date_timestamp)) % F.lit(time_index_hour_length_second)) >= (time_index_hour_length_second - hour_delay_second))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "id": "intensive-screen",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|time_index|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "|POLYGON ((-113.30...|20200103T181741_2...|    2020-01-03 18:23:48|  18|2020-01-03|         0|\n",
      "|POLYGON ((-113.18...|20200103T181741_2...|    2020-01-03 18:23:39|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.01...|20200103T181741_2...|    2020-01-03 18:23:46|  18|2020-01-03|         0|\n",
      "|POLYGON ((-113.27...|20200103T181741_2...|    2020-01-03 18:23:33|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.03...|20200103T181741_2...|    2020-01-03 18:23:19|  18|2020-01-03|         0|\n",
      "|POLYGON ((-110.89...|20200103T181741_2...|    2020-01-03 18:23:50|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.12...|20200103T181741_2...|    2020-01-03 18:23:44|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.13...|20200103T181741_2...|    2020-01-03 18:23:30|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.15...|20200103T181741_2...|    2020-01-03 18:23:15|  18|2020-01-03|         0|\n",
      "|POLYGON ((-111.00...|20200103T181741_2...|    2020-01-03 18:23:49|  18|2020-01-03|         0|\n",
      "|POLYGON ((-110.29...|20200103T181741_2...|    2020-01-03 18:23:40|  18|2020-01-03|         0|\n",
      "|POLYGON ((-109.73...|20200103T181741_2...|    2020-01-03 18:23:26|  18|2020-01-03|         0|\n",
      "|POLYGON ((-111.00...|20200103T181741_2...|    2020-01-03 18:23:12|  18|2020-01-03|         0|\n",
      "|POLYGON ((-109.65...|20200103T181741_2...|    2020-01-03 18:23:18|  18|2020-01-03|         0|\n",
      "|POLYGON ((-109.83...|20200103T181741_2...|    2020-01-03 18:23:07|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.06...|20200103T181741_2...|    2020-01-03 18:23:04|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.06...|20200103T181741_2...|    2020-01-03 18:22:50|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.09...|20200103T181741_2...|    2020-01-03 18:22:38|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.16...|20200103T181741_2...|    2020-01-03 18:23:01|  18|2020-01-03|         0|\n",
      "|POLYGON ((-112.18...|20200103T181741_2...|    2020-01-03 18:22:46|  18|2020-01-03|         0|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona_time_index_late.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "confused-indication",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "negative-cannon",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_dateforjoin = sentinel_sedona_time_index.withColumn('time_index_join', F.col('time_index')).union(sentinel_sedona_time_index_late.withColumn('time_index_join', (F.col('time_index') + 1))).union(sentinel_sedona_time_index_early.withColumn('time_index_join', (F.col('time_index') - 1)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "powerful-framework",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|time_index|time_index_join|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.437...|20191231T235959_2...|    2020-01-01 00:00:58|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((170.575...|20191231T235959_2...|    2020-01-01 00:01:30|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.494...|20191231T235959_2...|    2020-01-01 00:01:25|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((171.457...|20191231T235959_2...|    2020-01-01 00:01:07|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((171.989...|20191231T235959_2...|    2020-01-01 00:00:53|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((170.999...|20191231T235959_2...|    2020-01-01 00:01:19|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.559...|20191231T235959_2...|    2020-01-01 00:00:48|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.524...|20191231T235959_2...|    2020-01-01 00:00:34|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((168.494...|20191231T235959_2...|    2020-01-01 00:00:19|   0|2020-01-01|         0|              0|\n",
      "|POLYGON ((169.425...|20191231T235959_2...|    2020-01-01 00:00:08|   0|2020-01-01|         0|              0|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona_dateforjoin.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "id": "prospective-breakfast",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2345319"
      ]
     },
     "execution_count": 48,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_sedona_dateforjoin.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "id": "enclosed-george",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2042659"
      ]
     },
     "execution_count": 49,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_sedona_dateforjoin.drop_duplicates(subset=['s2_index']).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "horizontal-subscriber",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "environmental-habitat",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "exotic-mapping",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "similar-creator",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "nervous-roulette",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "great-reliance",
   "metadata": {},
   "source": [
    "## Third, spatio-temporal join Sentinel dataset and ICE-Sat 2 dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "roman-present",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 3.1 join the two Sedona dataframe according the timestamps, hour level at now, consider time delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "pleasant-aging",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_dateforjoin_maxmin = sentinel_sedona_dateforjoin.withColumn('sentinel_max_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") + hour_delay * 3600)).\\\n",
    "withColumn('sentinel_min_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") - hour_delay * 3600))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "id": "frequent-foundation",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+----------------------+----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|hour|      date|time_index|time_index_join|sentinel_max_timestamp|sentinel_min_timestamp|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+----------------------+----------------------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|   0|2020-01-01|         0|              0|            1577876489|            1577833289|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|   0|2020-01-01|         0|              0|            1577876480|            1577833280|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|   0|2020-01-01|         0|              0|            1577876465|            1577833265|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|              0|            1577876487|            1577833287|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|   0|2020-01-01|         0|              0|            1577876451|            1577833251|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|   0|2020-01-01|         0|              0|            1577876438|            1577833238|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|   0|2020-01-01|         0|              0|            1577876477|            1577833277|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|   0|2020-01-01|         0|              0|            1577876462|            1577833262|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|   0|2020-01-01|         0|              0|            1577876487|            1577833287|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|   0|2020-01-01|         0|              0|            1577876472|            1577833272|\n",
      "|POLYGON ((169.437...|20191231T235959_2...|    2020-01-01 00:00:58|   0|2020-01-01|         0|              0|            1577876458|            1577833258|\n",
      "|POLYGON ((170.575...|20191231T235959_2...|    2020-01-01 00:01:30|   0|2020-01-01|         0|              0|            1577876490|            1577833290|\n",
      "|POLYGON ((169.494...|20191231T235959_2...|    2020-01-01 00:01:25|   0|2020-01-01|         0|              0|            1577876485|            1577833285|\n",
      "|POLYGON ((171.457...|20191231T235959_2...|    2020-01-01 00:01:07|   0|2020-01-01|         0|              0|            1577876467|            1577833267|\n",
      "|POLYGON ((171.989...|20191231T235959_2...|    2020-01-01 00:00:53|   0|2020-01-01|         0|              0|            1577876453|            1577833253|\n",
      "|POLYGON ((170.999...|20191231T235959_2...|    2020-01-01 00:01:19|   0|2020-01-01|         0|              0|            1577876479|            1577833279|\n",
      "|POLYGON ((169.559...|20191231T235959_2...|    2020-01-01 00:00:48|   0|2020-01-01|         0|              0|            1577876448|            1577833248|\n",
      "|POLYGON ((169.524...|20191231T235959_2...|    2020-01-01 00:00:34|   0|2020-01-01|         0|              0|            1577876434|            1577833234|\n",
      "|POLYGON ((168.494...|20191231T235959_2...|    2020-01-01 00:00:19|   0|2020-01-01|         0|              0|            1577876419|            1577833219|\n",
      "|POLYGON ((169.425...|20191231T235959_2...|    2020-01-01 00:00:08|   0|2020-01-01|         0|              0|            1577876408|            1577833208|\n",
      "+--------------------+--------------------+-----------------------+----+----------+----------+---------------+----------------------+----------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona_dateforjoin_maxmin.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "id": "incorporated-discipline",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- orbit: geometry (nullable = false)\n",
      " |-- day_key: integer (nullable = true)\n",
      " |-- day_cycle: integer (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      " |-- day_exact_time: timestamp (nullable = true)\n",
      " |-- day_hour: integer (nullable = true)\n",
      " |-- Description: string (nullable = true)\n",
      " |-- is_timestamp: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "id": "wicked-jewelry",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df = is2_df.withColumn('time_index_join', F.floor((F.unix_timestamp('date_date_type') - F.lit(start_date_timestamp)) / F.lit(time_index_hour_length_second)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "id": "unique-samba",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_dateforjoin_maxmin.createOrReplaceTempView(\"sentinel_sedona_dateforjoin_maxmin\")\n",
    "is2_df.createOrReplaceTempView(\"is2_df\")\n",
    "test_temporal_indexed_spatial_join = spark.sql(\"SELECT * FROM sentinel_sedona_dateforjoin_maxmin, is2_df WHERE ST_Intersects(sentinel_sedona_dateforjoin_maxmin.geometry, is2_df.orbit) AND sentinel_sedona_dateforjoin_maxmin.time_index_join = is2_df.time_index_join\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "id": "promotional-charm",
   "metadata": {},
   "outputs": [],
   "source": [
    "# test_temporal_indexed_spatial_join.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "id": "south-general",
   "metadata": {},
   "outputs": [],
   "source": [
    "# import time\n",
    "# start_spatial_time = time.time()\n",
    "# test_spatial_join.select('uuid', 'Description', 'is_timestamp').write.format('com.databricks.spark.csv')\\\n",
    "#         .mode(\"overwrite\")\\\n",
    "#         .option('header', True)\\\n",
    "#         .save('sentinel_is2_intersect_pure_spatial')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "id": "fancy-drink",
   "metadata": {},
   "outputs": [],
   "source": [
    "# with open('speed_pure_spatial_join_month.txt', 'w') as f:\n",
    "#     f.write('pure spatial join time:')\n",
    "#     f.write(str(time.time() - start_spatial_time))\n",
    "#     f.write('/n')\n",
    "#     f.write('number of spatial intersections:')\n",
    "#     f.write(str(test_spatial_join.count()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 59,
   "id": "surprising-lithuania",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- geometry: geometry (nullable = false)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- Sentinel_date_date_type: timestamp (nullable = true)\n",
      " |-- hour: integer (nullable = true)\n",
      " |-- date: date (nullable = true)\n",
      " |-- time_index: long (nullable = true)\n",
      " |-- time_index_join: long (nullable = true)\n",
      " |-- sentinel_max_timestamp: long (nullable = true)\n",
      " |-- sentinel_min_timestamp: long (nullable = true)\n",
      " |-- orbit: geometry (nullable = false)\n",
      " |-- day_key: integer (nullable = true)\n",
      " |-- day_cycle: integer (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      " |-- day_exact_time: timestamp (nullable = true)\n",
      " |-- day_hour: integer (nullable = true)\n",
      " |-- Description: string (nullable = true)\n",
      " |-- is_timestamp: long (nullable = true)\n",
      " |-- time_index_join: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "test_temporal_indexed_spatial_join.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "id": "white-apple",
   "metadata": {},
   "outputs": [],
   "source": [
    "test_ST_join = test_temporal_indexed_spatial_join.filter(F.col('sentinel_max_timestamp') >= F.col('is_timestamp')).filter(F.col('sentinel_min_timestamp') <= F.col('is_timestamp'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "id": "practical-pattern",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "start_temporal_time = time.time()\n",
    "test_ST_join.select('s2_index', 'Description', 'is_timestamp').drop_duplicates(subset=['s2_index', 'Description']).write.format('com.databricks.spark.csv')\\\n",
    "        .mode(\"overwrite\")\\\n",
    "        .option('header', True)\\\n",
    "        .save('sentinel_is2_intersect_spatial_temporal')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "id": "generous-dance",
   "metadata": {},
   "outputs": [],
   "source": [
    "with open('ICESpark_STSI_{}_months_{}_temporal_index_S2I2.txt'.format(str(duration_month), str(time_index_hour_length)), 'w') as f:\n",
    "    f.write('STSI time in total:')\n",
    "    f.write(str(time.time() - start_temporal_time))\n",
    "    f.write('\\n')\n",
    "    f.write('number of spatial intersections:')\n",
    "    f.write(str(test_ST_join.count()))\n",
    "    f.write('\\n')\n",
    "    f.write('from and to date')\n",
    "    f.write(start_date)\n",
    "    f.write('\\n')\n",
    "    f.write(end_date)\n",
    "    f.write('\\n')\n",
    "    f.write('ICEsat2 data count:')\n",
    "    f.write(str(is2_df_raw.count()))\n",
    "    f.write('\\n')\n",
    "\n",
    "    f.write('Sentinel 2 data count:')\n",
    "    f.write(str(sentinel_sedona.count()))\n",
    "    f.write('\\n')\n",
    "    f.write('temporal index:')\n",
    "    f.write(str(time_index_hour_length))\n",
    "    \n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "id": "honey-objective",
   "metadata": {},
   "outputs": [],
   "source": [
    "# test_ST_join.select('s2_index', 'Description', 'is_timestamp').drop_duplicates(subset=['s2_index', 'Description']).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "norwegian-wholesale",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "id": "exposed-point",
   "metadata": {},
   "outputs": [],
   "source": [
    "# \"fhldsai{}_{}\".format(str(2), 'se')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "id": "offensive-variable",
   "metadata": {},
   "outputs": [],
   "source": [
    "# boundaries in the format of polygon）"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
